package com.masonluo.mlonlinejudge.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.masonluo.mlonlinejudge.model.mq.JudgeMessage;
import com.masonluo.mlonlinejudge.service.SolutionService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author masonluo
 * @date 2021/2/5 2:56 下午
 */
@Configuration
public class RocketMQConfig {
    @Value("${mq.rocket.namesrvaddr}")
    private String nameSrvAddr;

    @Value("${mq.rocket.judge.group}")
    private String judgeGroup;

    @Value("${mq.rocket.judge.topic}")
    private String topic;

    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private SolutionService solutionService;

    @Bean
    public DefaultMQProducer syncProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(judgeGroup);
        producer.setNamesrvAddr(nameSrvAddr);
        producer.start();
        return producer;
    }

    @Bean
    public DefaultMQPushConsumer consumer() throws MQClientException {
        return init();
    }

    private DefaultMQPushConsumer init() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(judgeGroup);
        consumer.setNamesrvAddr(nameSrvAddr);
        consumer.subscribe(topic, "*");
        // 每次只消费一条消息
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // 每次只消费一条消息
            try {
                if (msgs.size() > 1) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                MessageExt msg = msgs.get(0);
                byte[] body = msg.getBody();
                JudgeMessage param = mapper.readValue(body, JudgeMessage.class);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                System.out.println(param);
                solutionService.judge(param);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Throwable e) {
                // 遇到Jackson异常， 直接返回成功，不再消费这条消息
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        return consumer;
    }
}
